{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "5c571ec2-8505-49c2-8804-c910ad83d8e1",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# RDD的介绍"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "406e4755-7792-47d3-b4e8-8fd2a8533f42",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "环境初始化\n",
    "> 首先执行环境的初始化。  \n",
    "> 将存储账户与Spark环境关联，以便于在Spark程序中可以使用存储。  \n",
    "> `dfs_endpoint` 是文件系统的根端点。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b5ac740e-40f5-4bda-a0b2-e7c8b2b2e77d",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "%run \"../../../initialization\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "245eff14-de2f-4880-a006-78dc0c08b8bf",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 为什么需要RDD\n",
    "分布式计算需要\n",
    "* 分区控制\n",
    "* Shuffle控制\n",
    "* 数据存储\\序列化\\发送\n",
    "* 数据计算API\n",
    "* 等等\n",
    "\n",
    "这些功能，不能简单的通过Python内置的本地集合对象(如 List)去完成。\n",
    "\n",
    "我们在分布式框架中，我们需要一个统一的数据抽象来实现上述分布式计算所需的功能，这个数据抽象就是RDD。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3fe935fe-876e-4efb-ac85-ffd791672e0c",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 什么是RDD\n",
    "RDD(Resilient Distributed Dataset)叫做弹性分布式数据集，是Spark中最基本的数据抽象，代表一个不可变、可分区、元素可并行计算的集合。\n",
    "\n",
    "* Dataset：一个数据集合，用于存放数据的。\n",
    "* Distributed：RDD中的数据是分布式存储的，可用于分布式计算。\n",
    "* Resilient：RDD中的数据可以存储在内存中或者磁盘中。\n",
    "\n",
    "RDD\n",
    "* 不可变：immutable\n",
    "  * 不可变集合\n",
    "  * 变量的声明使用val\n",
    "* 可分区：partitioned\n",
    "  * 集合的数据被划分为很多部分\n",
    "  * 每部分称为分区 partition\n",
    "* 并行计算：parallel\n",
    "  * 集合中的数据可以被并行的计算处理\n",
    "  * 每个分区数据被一个Task任务处理\n",
    "\n",
    "所有的运算以及操作都建立在RDD数据结构的基础之上。\n",
    "\n",
    "可以认为RDD是分布式的列表List或数组Array，抽象的数据结构，RDD是一个抽象类Abstract Class和泛型Generic Type。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "dd8e84a2-40dc-4cd7-abc9-b47054ed4cf1",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "%scala\n",
    "/**\n",
    " * Internally, each RDD is characterized by five main properties:\n",
    " *\n",
    " *  - A list of partitions\n",
    " *  - A function for computing each split\n",
    " *  - A list of dependencies on other RDDs\n",
    " *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)\n",
    " *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for\n",
    " *    an HDFS file)\n",
    " */\n",
    "abstract class RDD[T: ClassTag](\n",
    "    @transient private var _sc: SparkContext,\n",
    "    @transient private var deps: Seq[Dependency[_]]\n",
    "  ) extends Serializable with Logging {\n",
    "  // TODO ...\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "4c45d068-4ddc-451a-aeeb-75bca05bac26",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## RDD的五大特性\n",
    "\n",
    "RDD 数据结构内部有五个特性（摘录RDD 源码）：前三个特征每个RDD都具备的，后两个特征可选的。\n",
    "\n",
    "Internally, each RDD is characterized by five main properties:\n",
    "\n",
    "* A list of partitions.(RDD是有分区的)\n",
    "* A function for computing each split.(计算方法都会作用到每个分区上)\n",
    "* A list of dependencies on other RDDs.(RDD之间是有依赖关系的)\n",
    "* Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned).(KV型的RDD可以有分区器)\n",
    "* Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file).(RDD分区数据的读取会尽量靠近数据所在地)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3d6557d4-1adf-4efa-8136-95fa33a656be",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### RDD是有分区的\n",
    "\n",
    "- A list of partitions\n",
    "\n",
    "分区是RDD数据存储的最小单位。  \n",
    "一份RDD的数据，本质上是分割成了多个分区。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "ce07af36-c830-49a2-865a-2e11eae8869b",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# https://github.com/mesos/spark/pull/718\n",
    "print(\"默认情况下Spark的最小分区数：\", sc.defaultMinPartitions)\n",
    "rdd1 = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])\n",
    "print(\"RDD1的分区数：\", rdd1.getNumPartitions())\n",
    "print(\"RDD1的分区情况：\", rdd1.glom().collect())\n",
    "\n",
    "print(\"我们可以直接指定RDD的分区数：\")\n",
    "rdd2 = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 5)\n",
    "print(\"RDD2的分区数：\", rdd2.getNumPartitions())\n",
    "print(\"RDD2的分区情况：\", rdd2.glom().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7907928c-cff3-4906-a338-ef3fb1807a82",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 计算方法都会作用到每个分区上\n",
    "\n",
    "- A function for computing each split\n",
    "\n",
    "以下代码中的 x * 10 会作用到每个分区的每个元素上。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "81132204-d53f-45db-95b4-bc827714b36f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])\n",
    "print(\"RDD的分区情况：\", rdd.glom().collect())\n",
    "print(\"RDD的每个分区的每个元素乘10后的分区情况：\", rdd.map(lambda x: x * 10).glom().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "4f102fd3-3c40-4cd8-817c-b43299d68a17",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### RDD之间是有依赖关系的\n",
    "\n",
    "- A list of dependencies on other RDDs\n",
    "\n",
    "如下代码之间是有依赖关系的  \n",
    "`textFile`->`rdd1`->`rdd2`->`rdd3`->`rdd4`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "14414ea4-b9c8-4f0d-a216-f5b255bed9f2",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd1 = sc.textFile(dfs_endpoint + \"/word.txt\")\n",
    "rdd2 = rdd1.flatMap(lambda x: x.split(\" \"))\n",
    "rdd3 = rdd2.map(lambda x: (x, 1))\n",
    "rdd4 = rdd3.reduceByKey(lambda a, b: a + b)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7aae859b-3e71-4ac7-84f4-06727d21df65",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd4.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "8cef0194-02fc-4fd7-af13-bc1a7bceecab",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd4.toDebugString()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "6f35f6c6-51a0-41dd-a9fc-af5997a71676",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### Key-Value型的RDD可以有分区器\n",
    "\n",
    "- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)\n",
    "\n",
    "Key-Value RDD：RDD中存储的是二元组，比如：(\"spark\", 3.2)。  \n",
    "默认分区器：Hash分区规则。  \n",
    "可以手动设置分区规则（rdd.partitionBy的方法来设置）。  \n",
    "\n",
    "> 由于不是所有RDD都是Key-Value型的，所以这个特性是可选的。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "19b8c970-2c6d-41cd-b2ea-e79e5298f59f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9],3).map(lambda x: (x, x % 4))\n",
    "print(rdd.glom().collect(), rdd.partitioner)\n",
    "\n",
    "rdd2 = rdd.partitionBy(3)\n",
    "print(rdd2.glom().collect(), rdd2.partitioner)\n",
    "\n",
    "rdd3 = rdd.partitionBy(3, lambda x: x % 3)\n",
    "print(rdd3.glom().collect(), rdd3.partitioner)\n",
    "\n",
    "rdd4 = rdd.partitionBy(3, lambda x: x % 2)\n",
    "print(rdd4.glom().collect(), rdd4.partitioner)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "2bcbf606-74e0-47fc-8850-b633da9b678c",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "%scala\n",
    "import org.apache.spark.HashPartitioner\n",
    "import org.apache.spark.RangePartitioner\n",
    "\n",
    "val rdd = sc.parallelize(List(0,1,2,3,4,5,6,7,8,9),3).map(x => (x, x))\n",
    "rdd.glom().collect().foreach(x => println(x.mkString(\",\")))\n",
    "println(rdd.partitioner)\n",
    "\n",
    "val rdd2 = rdd.partitionBy(new HashPartitioner(4))\n",
    "rdd2.glom().collect().foreach(x => println(x.mkString(\",\")))\n",
    "println(rdd2.partitioner)\n",
    "\n",
    "val rdd3 = rdd.partitionBy(new RangePartitioner(4, rdd))\n",
    "rdd3.glom().collect().foreach(x => println(x.mkString(\",\")))\n",
    "println(rdd3.partitioner)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "747156c8-0027-4ca8-b049-b742c99c67f7",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### RDD的分区规划，会尽量靠近数据所在的服务器\n",
    "\n",
    "- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)\n",
    "\n",
    "在初始RDD（读取数据的时候）规划的时候，分区会尽量规划到存储数据所在的服务器上。\n",
    "\n",
    "因为这样可以走`本地读取`，避免`网络读取`。\n",
    "\n",
    "本地读取：Executor所在的服务器，同样是一个DataNode，同时这个DataNode上有他要读取的数据，所以可以直接读取机器硬盘即可，无需网络传输。\n",
    "\n",
    "网络读取：读取数据，需要经过网络的传输才能读取到。\n",
    "\n",
    "本地读取的性能远大于网络读取的性能。\n",
    "\n",
    "> Spark会在`确保并行计算能力的前提下`，尽量确保本地读取。  \n",
    "> 这里是尽量确保，而不是100%确保。  \n",
    "> 所以这个特性也是可选的。"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 4
   },
   "notebookName": "RDD的介绍",
   "notebookOrigID": 2180139033493836,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "name": ""
  },
  "language_info": {
   "name": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
